/*

 * Licensed to the Apache Software Foundation (ASF) under one

 * or more contributor license agreements.  See the NOTICE file

 * distributed with this work for additional information

 * regarding copyright ownership.  The ASF licenses this file

 * to you under the Apache License, Version 2.0 (the

 * "License"); you may not use this file except in compliance

 * with the License.  You may obtain a copy of the License at

 *

 *     http://www.apache.org/licenses/LICENSE-2.0

 *

 * Unless required by applicable law or agreed to in writing, software

 * distributed under the License is distributed on an "AS IS" BASIS,

 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

 * See the License for the specific language governing permissions and

 * limitations under the License.

 */

package com.bff.gaia.unified.sdk.io;



import org.apache.avro.Schema;

import org.apache.avro.file.CodecFactory;

import org.apache.avro.file.DataFileConstants;

import org.apache.avro.generic.GenericDatumReader;

import org.apache.avro.generic.GenericRecord;

import org.apache.avro.io.BinaryDecoder;

import org.apache.avro.io.DatumReader;

import org.apache.avro.io.DecoderFactory;

import org.apache.avro.reflect.ReflectData;

import org.apache.avro.reflect.ReflectDatumReader;

import com.bff.gaia.unified.sdk.PipelineRunner;

import com.bff.gaia.unified.sdk.annotations.Experimental;

import com.bff.gaia.unified.sdk.coders.AvroCoder;

import com.bff.gaia.unified.sdk.coders.Coder;

import com.bff.gaia.unified.sdk.io.fs.EmptyMatchTreatment;

import com.bff.gaia.unified.sdk.io.fs.MatchResult.Metadata;

import com.bff.gaia.unified.sdk.io.fs.ResourceId;

import com.bff.gaia.unified.sdk.options.PipelineOptions;

import com.bff.gaia.unified.sdk.options.ValueProvider;

import com.bff.gaia.unified.sdk.transforms.SerializableFunction;

import com.bff.gaia.unified.sdk.values.PCollection;

import com.bff.gaia.unified.vendor.guava.com.google.common.annotations.VisibleForTesting;

import com.bff.gaia.unified.vendor.guava.com.google.common.base.MoreObjects;

import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;

import org.apache.commons.compress.compressors.snappy.SnappyCompressorInputStream;

import org.apache.commons.compress.compressors.xz.XZCompressorInputStream;

import org.apache.commons.compress.utils.CountingInputStream;

import org.apache.commons.compress.utils.IOUtils;



import javax.annotation.Nullable;

import javax.annotation.concurrent.GuardedBy;

import java.io.*;

import java.nio.ByteBuffer;

import java.nio.channels.Channels;

import java.nio.channels.ReadableByteChannel;

import java.nio.channels.SeekableByteChannel;

import java.nio.charset.StandardCharsets;

import java.util.Arrays;

import java.util.Map;

import java.util.WeakHashMap;

import java.util.zip.Inflater;

import java.util.zip.InflaterInputStream;



import static com.bff.gaia.unified.sdk.io.FileBasedSource.Mode.SINGLE_FILE_OR_SUBRANGE;

import static com.bff.gaia.unified.vendor.guava.com.google.common.base.Preconditions.*;



// CHECKSTYLE.OFF: JavadocStyle



/**

 * Do not use in pipelines directly: most users should use {@link AvroIO.Read}.

 *

 * <p>A {@link FileBasedSource} for reading Avro files.

 *

 * <p>To read a {@link PCollection} of objects from one or more Avro files, use {@link

 * AvroSource#from} to specify the path(s) of the files to read. The {@link AvroSource} that is

 * returned will read objects of type {@link GenericRecord} with the schema(s) that were written at

 * file creation. To further configure the {@link AvroSource} to read with a user-defined schema, or

 * to return records of a type other than {@link GenericRecord}, use {@link

 * AvroSource#withSchema(Schema)} (using an Avro {@link Schema}), {@link

 * AvroSource#withSchema(String)} (using a JSON schema), or {@link AvroSource#withSchema(Class)} (to

 * return objects of the Avro-generated class specified).

 *

 * <p>An {@link AvroSource} can be read from using the {@link Read} transform. For example:

 *

 * <pre>{@code

 * AvroSource<MyType> source = AvroSource.from(file.toPath()).withSchema(MyType.class);

 * PCollection<MyType> records = Read.from(mySource);

 * }</pre>

 *

 * <p>This class's implementation is based on the <a

 * href="https://avro.apache.org/docs/1.7.7/spec.html">Avro 1.7.7</a> specification and implements

 * parsing of some parts of Avro Object Container Files. The rationale for doing so is that the Avro

 * API does not provide efficient ways of computing the precise offsets of blocks within a file,

 * which is necessary to support dynamic work rebalancing. However, whenever it is possible to use

 * the Avro API in a way that supports maintaining precise offsets, this class uses the Avro API.

 *

 * <p>Avro Object Container files store records in blocks. Each block contains a collection of

 * records. Blocks may be encoded (e.g., with bzip2, deflate, snappy, etc.). Blocks are delineated

 * from one another by a 16-byte sync marker.

 *

 * <p>An {@link AvroSource} for a subrange of a single file contains records in the blocks such that

 * the start offset of the block is greater than or equal to the start offset of the source and less

 * than the end offset of the source.

 *

 * <p>To use XZ-encoded Avro files, please include an explicit dependency on {@code xz-1.8.jar},

 * which has been marked as optional in the Maven {@code sdk/pom.xml}.

 *

 * <pre>{@code

 * <dependency>

 *   <groupId>org.tukaani</groupId>

 *   <artifactId>xz</artifactId>

 *   <version>1.8</version>

 * </dependency>

 * }</pre>

 *

 * <h3>Permissions</h3>

 *

 * <p>Permission requirements depend on the {@link PipelineRunner} that is used to execute the

 * pipeline. Please refer to the documentation of corresponding {@link PipelineRunner}s for more

 * details.

 *

 * @param <T> The type of records to be read from the source.

 */

// CHECKSTYLE.ON: JavadocStyle

@Experimental(Experimental.Kind.SOURCE_SINK)

public class AvroSource<T> extends BlockBasedSource<T> {

  // Default minimum bundle size (chosen as two default-size Avro blocks to attempt to

  // ensure that every source has at least one block of records).

  // The default sync interval is 64k.

  private static final long DEFAULT_MIN_BUNDLE_SIZE = 2L * DataFileConstants.DEFAULT_SYNC_INTERVAL;



  // Use cases of AvroSource are:

  // 1) AvroSource<GenericRecord> Reading GenericRecord records with a specified schema.

  // 2) AvroSource<Foo> Reading records of a generated Avro class Foo.

  // 3) AvroSource<T> Reading GenericRecord records with an unspecified schema

  //    and converting them to type T.

  //                     |    Case 1     |    Case 2   |     Case 3    |

  // type                | GenericRecord |     Foo     | GenericRecord |

  // readerSchemaString  |    non-null   |   non-null  |     null      |

  // parseFn             |      null     |     null    |   non-null    |

  // outputCoder         |      null     |     null    |   non-null    |

  private static class Mode<T> implements Serializable {

    private final Class<?> type;



    // The JSON schema used to decode records.

    @Nullable

	private String readerSchemaString;



    @Nullable

	private final SerializableFunction<GenericRecord, T> parseFn;



    @Nullable

	private final Coder<T> outputCoder;



    private Mode(

        Class<?> type,

        @Nullable String readerSchemaString,

        @Nullable SerializableFunction<GenericRecord, T> parseFn,

        @Nullable Coder<T> outputCoder) {

      this.type = type;

      this.readerSchemaString = internSchemaString(readerSchemaString);

      this.parseFn = parseFn;

      this.outputCoder = outputCoder;

    }



    private void readObject(ObjectInputStream is) throws IOException, ClassNotFoundException {

      is.defaultReadObject();

      readerSchemaString = internSchemaString(readerSchemaString);

    }



    private Coder<T> getOutputCoder() {

      if (parseFn == null) {

        return AvroCoder.of((Class<T>) type, internOrParseSchemaString(readerSchemaString));

      } else {

        return outputCoder;

      }

    }



    private void validate() {

      if (parseFn == null) {

        checkArgument(

            readerSchemaString != null,

            "schema must be specified using withSchema() when not using a parse fn");

      }

    }

  }



  private static Mode<GenericRecord> readGenericRecordsWithSchema(String schema) {

    return new Mode<>(GenericRecord.class, schema, null, null);

  }



  private static <T> Mode<T> readGeneratedClasses(Class<T> clazz) {

    return new Mode<>(clazz, ReflectData.get().getSchema(clazz).toString(), null, null);

  }



  private static <T> Mode<T> parseGenericRecords(

      SerializableFunction<GenericRecord, T> parseFn, Coder<T> outputCoder) {

    return new Mode<>(GenericRecord.class, null, parseFn, outputCoder);

  }



  private final Mode<T> mode;



  /**

   * Reads from the given file name or pattern ("glob"). The returned source needs to be further

   * configured by calling {@link #withSchema} to return a type other than {@link GenericRecord}.

   */

  public static AvroSource<GenericRecord> from(ValueProvider<String> fileNameOrPattern) {

    return new AvroSource<>(

        fileNameOrPattern,

        EmptyMatchTreatment.DISALLOW,

        DEFAULT_MIN_BUNDLE_SIZE,

        readGenericRecordsWithSchema(null /* will need to be specified in withSchema */));

  }



  public static AvroSource<GenericRecord> from(Metadata metadata) {

    return new AvroSource<>(

        metadata,

        DEFAULT_MIN_BUNDLE_SIZE,

        0,

        metadata.sizeBytes(),

        readGenericRecordsWithSchema(null /* will need to be specified in withSchema */));

  }



  /** Like {@link #from(ValueProvider)}. */

  public static AvroSource<GenericRecord> from(String fileNameOrPattern) {

    return from(ValueProvider.StaticValueProvider.of(fileNameOrPattern));

  }



  public AvroSource<T> withEmptyMatchTreatment(EmptyMatchTreatment emptyMatchTreatment) {

    return new AvroSource<>(

        getFileOrPatternSpecProvider(), emptyMatchTreatment, getMinBundleSize(), mode);

  }



  /** Reads files containing records that conform to the given schema. */

  public AvroSource<GenericRecord> withSchema(String schema) {

    checkArgument(schema != null, "schema can not be null");

    return new AvroSource<>(

        getFileOrPatternSpecProvider(),

        getEmptyMatchTreatment(),

        getMinBundleSize(),

        readGenericRecordsWithSchema(schema));

  }



  /** Like {@link #withSchema(String)}. */

  public AvroSource<GenericRecord> withSchema(Schema schema) {

    checkArgument(schema != null, "schema can not be null");

    return withSchema(schema.toString());

  }



  /** Reads files containing records of the given class. */

  public <X> AvroSource<X> withSchema(Class<X> clazz) {

    checkArgument(clazz != null, "clazz can not be null");

    if (getMode() == SINGLE_FILE_OR_SUBRANGE) {

      return new AvroSource<>(

          getSingleFileMetadata(),

          getMinBundleSize(),

          getStartOffset(),

          getEndOffset(),

          readGeneratedClasses(clazz));

    }

    return new AvroSource<>(

        getFileOrPatternSpecProvider(),

        getEmptyMatchTreatment(),

        getMinBundleSize(),

        readGeneratedClasses(clazz));

  }



  /**

   * Reads {@link GenericRecord} of unspecified schema and maps them to instances of a custom type

   * using the given {@code parseFn} and encoded using the given coder.

   */

  public <X> AvroSource<X> withParseFn(

      SerializableFunction<GenericRecord, X> parseFn, Coder<X> coder) {

    checkArgument(parseFn != null, "parseFn can not be null");

    checkArgument(coder != null, "coder can not be null");

    if (getMode() == SINGLE_FILE_OR_SUBRANGE) {

      return new AvroSource<>(

          getSingleFileMetadata(),

          getMinBundleSize(),

          getStartOffset(),

          getEndOffset(),

          parseGenericRecords(parseFn, coder));

    }

    return new AvroSource<>(

        getFileOrPatternSpecProvider(),

        getEmptyMatchTreatment(),

        getMinBundleSize(),

        parseGenericRecords(parseFn, coder));

  }



  /**

   * Sets the minimum bundle size. Refer to {@link OffsetBasedSource} for a description of {@code

   * minBundleSize} and its use.

   */

  public AvroSource<T> withMinBundleSize(long minBundleSize) {

    if (getMode() == SINGLE_FILE_OR_SUBRANGE) {

      return new AvroSource<>(

          getSingleFileMetadata(), minBundleSize, getStartOffset(), getEndOffset(), mode);

    }

    return new AvroSource<>(

        getFileOrPatternSpecProvider(), getEmptyMatchTreatment(), minBundleSize, mode);

  }



  /** Constructor for FILEPATTERN mode. */

  private AvroSource(

      ValueProvider<String> fileNameOrPattern,

      EmptyMatchTreatment emptyMatchTreatment,

      long minBundleSize,

      Mode<T> mode) {

    super(fileNameOrPattern, emptyMatchTreatment, minBundleSize);

    this.mode = mode;

  }



  /** Constructor for SINGLE_FILE_OR_SUBRANGE mode. */

  private AvroSource(

      Metadata metadata, long minBundleSize, long startOffset, long endOffset, Mode<T> mode) {

    super(metadata, minBundleSize, startOffset, endOffset);

    this.mode = mode;

  }



  @Override

  public void validate() {

    super.validate();

    mode.validate();

  }



  /**

   * Used by the Dataflow worker. Do not introduce new usages. Do not delete without confirming that

   * Dataflow ValidatesRunner tests pass.

   *

   * @deprecated Used by Dataflow worker

   */

  @Deprecated

  public BlockBasedSource<T> createForSubrangeOfFile(String fileName, long start, long end)

      throws IOException {

    return createForSubrangeOfFile(FileSystems.matchSingleFileSpec(fileName), start, end);

  }



  @Override

  public BlockBasedSource<T> createForSubrangeOfFile(Metadata fileMetadata, long start, long end) {

    return new AvroSource<>(fileMetadata, getMinBundleSize(), start, end, mode);

  }



  @Override

  protected BlockBasedReader<T> createSingleFileReader(PipelineOptions options) {

    return new AvroReader<>(this);

  }



  @Override

  public Coder<T> getOutputCoder() {

    return mode.getOutputCoder();

  }



  @VisibleForTesting

  @Nullable

  String getReaderSchemaString() {

    return mode.readerSchemaString;

  }



  /** Avro file metadata. */

  @VisibleForTesting

  static class AvroMetadata {

    private final byte[] syncMarker;

    private final String codec;

    private final String schemaString;



    AvroMetadata(byte[] syncMarker, String codec, String schemaString) {

      this.syncMarker = checkNotNull(syncMarker, "syncMarker");

      this.codec = checkNotNull(codec, "codec");

      this.schemaString = internSchemaString(checkNotNull(schemaString, "schemaString"));

    }



    /**

     * The JSON-encoded <a href="https://avro.apache.org/docs/1.7.7/spec.html#schemas">schema</a>

     * string for the file.

     */

    public String getSchemaString() {

      return schemaString;

    }



    /**

     * The <a href="https://avro.apache.org/docs/1.7.7/spec.html#Required+Codecs">codec</a> of the

     * file.

     */

    public String getCodec() {

      return codec;

    }



    /**

     * The 16-byte sync marker for the file. See the documentation for <a

     * href="https://avro.apache.org/docs/1.7.7/spec.html#Object+Container+Files">Object Container

     * File</a> for more information.

     */

    public byte[] getSyncMarker() {

      return syncMarker;

    }

  }



  /**

   * Reads the {@link AvroMetadata} from the header of an Avro file.

   *

   * <p>This method parses the header of an Avro <a

   * href="https://avro.apache.org/docs/1.7.7/spec.html#Object+Container+Files">Object Container

   * File</a>.

   *

   * @throws IOException if the file is an invalid format.

   */

  @VisibleForTesting

  static AvroMetadata readMetadataFromFile(ResourceId fileResource) throws IOException {

    String codec = null;

    String schemaString = null;

    byte[] syncMarker;

    try (InputStream stream = Channels.newInputStream(FileSystems.open(fileResource))) {

      BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(stream, null);



      // The header of an object container file begins with a four-byte magic number, followed

      // by the file metadata (including the schema and codec), encoded as a map. Finally, the

      // header ends with the file's 16-byte sync marker.

      // See https://avro.apache.org/docs/1.7.7/spec.html#Object+Container+Files for details on

      // the encoding of container files.



      // Read the magic number.

      byte[] magic = new byte[DataFileConstants.MAGIC.length];

      decoder.readFixed(magic);

      if (!Arrays.equals(magic, DataFileConstants.MAGIC)) {

        throw new IOException("Missing Avro file signature: " + fileResource);

      }



      // Read the metadata to find the codec and schema.

      ByteBuffer valueBuffer = ByteBuffer.allocate(512);

      long numRecords = decoder.readMapStart();

      while (numRecords > 0) {

        for (long recordIndex = 0; recordIndex < numRecords; recordIndex++) {

          String key = decoder.readString();

          // readBytes() clears the buffer and returns a buffer where:

          // - position is the start of the bytes read

          // - limit is the end of the bytes read

          valueBuffer = decoder.readBytes(valueBuffer);

          byte[] bytes = new byte[valueBuffer.remaining()];

          valueBuffer.get(bytes);

          if (key.equals(DataFileConstants.CODEC)) {

            codec = new String(bytes, StandardCharsets.UTF_8);

          } else if (key.equals(DataFileConstants.SCHEMA)) {

            schemaString = new String(bytes, StandardCharsets.UTF_8);

          }

        }

        numRecords = decoder.mapNext();

      }

      if (codec == null) {

        codec = DataFileConstants.NULL_CODEC;

      }



      // Finally, read the sync marker.

      syncMarker = new byte[DataFileConstants.SYNC_SIZE];

      decoder.readFixed(syncMarker);

    }

    checkState(schemaString != null, "No schema present in Avro file metadata %s", fileResource);

    return new AvroMetadata(syncMarker, codec, schemaString);

  }



  // A logical reference cache used to store schemas and schema strings to allow us to

  // "intern" values and reduce the number of copies of equivalent objects.

  private static final Map<String, Schema> schemaLogicalReferenceCache = new WeakHashMap<>();

  private static final Map<String, String> schemaStringLogicalReferenceCache = new WeakHashMap<>();



  // We avoid String.intern() because depending on the JVM, these may be added to the PermGenSpace

  // which we want to avoid otherwise we could run out of PermGenSpace.

  private static synchronized String internSchemaString(String schema) {

    String internSchema = schemaStringLogicalReferenceCache.get(schema);

    if (internSchema != null) {

      return internSchema;

    }

    schemaStringLogicalReferenceCache.put(schema, schema);

    return schema;

  }



  private static synchronized Schema internOrParseSchemaString(String schemaString) {

    Schema schema = schemaLogicalReferenceCache.get(schemaString);

    if (schema != null) {

      return schema;

    }

    Schema.Parser parser = new Schema.Parser();

    schema = parser.parse(schemaString);

    schemaLogicalReferenceCache.put(schemaString, schema);

    return schema;

  }



  // Reading the object from Java serialization typically does not go through the constructor,

  // we use readResolve to replace the constructed instance with one which uses the constructor

  // allowing us to intern any schemas.

  @SuppressWarnings("unused")

  private Object readResolve() throws ObjectStreamException {

    switch (getMode()) {

      case SINGLE_FILE_OR_SUBRANGE:

        return new AvroSource<>(

            getSingleFileMetadata(), getMinBundleSize(), getStartOffset(), getEndOffset(), mode);

      case FILEPATTERN:

        return new AvroSource<>(

            getFileOrPatternSpecProvider(), getEmptyMatchTreatment(), getMinBundleSize(), mode);

      default:

        throw new InvalidObjectException(

            String.format("Unknown mode %s for AvroSource %s", getMode(), this));

    }

  }



  /**

   * A {@link BlockBasedSource.Block} of Avro records.

   *

   * @param <T> The type of records stored in the block.

   */

  @Experimental(Experimental.Kind.SOURCE_SINK)

  static class AvroBlock<T> extends Block<T> {

    private final Mode<T> mode;



    // The number of records in the block.

    private final long numRecords;



    // The current record in the block. Initialized in readNextRecord.

    @Nullable

	private T currentRecord;



    // The index of the current record in the block.

    private long currentRecordIndex = 0;



    // A DatumReader to read records from the block.

    private final DatumReader<?> reader;



    // A BinaryDecoder used by the reader to decode records.

    private final BinaryDecoder decoder;



    /**

     * Decodes a byte array as an InputStream. The byte array may be compressed using some codec.

     * Reads from the returned stream will result in decompressed bytes.

     *

     * <p>This supports the same codecs as Avro's {@link CodecFactory}, namely those defined in

     * {@link DataFileConstants}.

     *

     * <ul>

     *   <li>"snappy" : Google's Snappy compression

     *   <li>"deflate" : deflate compression

     *   <li>"bzip2" : Bzip2 compression

     *   <li>"xz" : xz compression

     *   <li>"null" (the string, not the value): Uncompressed data

     * </ul>

     */

    private static InputStream decodeAsInputStream(byte[] data, String codec) throws IOException {

      ByteArrayInputStream byteStream = new ByteArrayInputStream(data);

      switch (codec) {

        case DataFileConstants.SNAPPY_CODEC:

          return new SnappyCompressorInputStream(byteStream, 1 << 16 /* Avro uses 64KB blocks */);

        case DataFileConstants.DEFLATE_CODEC:

          // nowrap == true: Do not expect ZLIB header or checksum, as Avro does not write them.

          Inflater inflater = new Inflater(true);

          return new InflaterInputStream(byteStream, inflater);

        case DataFileConstants.XZ_CODEC:

          return new XZCompressorInputStream(byteStream);

        case DataFileConstants.BZIP2_CODEC:

          return new BZip2CompressorInputStream(byteStream);

        case DataFileConstants.NULL_CODEC:

          return byteStream;

        default:

          throw new IllegalArgumentException("Unsupported codec: " + codec);

      }

    }



    AvroBlock(byte[] data, long numRecords, Mode<T> mode, String writerSchemaString, String codec)

        throws IOException {

      this.mode = mode;

      this.numRecords = numRecords;

      checkNotNull(writerSchemaString, "writerSchemaString");

      Schema writerSchema = internOrParseSchemaString(writerSchemaString);

      Schema readerSchema =

          internOrParseSchemaString(

              MoreObjects.firstNonNull(mode.readerSchemaString, writerSchemaString));

      this.reader =

          (mode.type == GenericRecord.class)

              ? new GenericDatumReader<T>(writerSchema, readerSchema)

              : new ReflectDatumReader<T>(writerSchema, readerSchema);

      this.decoder = DecoderFactory.get().binaryDecoder(decodeAsInputStream(data, codec), null);

    }



    @Override

    public T getCurrentRecord() {

      return currentRecord;

    }



    @Override

    public boolean readNextRecord() throws IOException {

      if (currentRecordIndex >= numRecords) {

        return false;

      }

      Object record = reader.read(null, decoder);

      currentRecord =

          (mode.parseFn == null) ? ((T) record) : mode.parseFn.apply((GenericRecord) record);

      currentRecordIndex++;

      return true;

    }



    @Override

    public double getFractionOfBlockConsumed() {

      return ((double) currentRecordIndex) / numRecords;

    }

  }



  /**

   * A {@link BlockBasedSource.BlockBasedReader} for reading blocks from Avro files.

   *

   * <p>An Avro Object Container File consists of a header followed by a 16-bit sync marker and then

   * a sequence of blocks, where each block begins with two encoded longs representing the total

   * number of records in the block and the block's size in bytes, followed by the block's

   * (optionally-encoded) records. Each block is terminated by a 16-bit sync marker.

   *

   * @param <T> The type of records contained in the block.

   */

  @Experimental(Experimental.Kind.SOURCE_SINK)

  public static class AvroReader<T> extends BlockBasedReader<T> {

    // Initialized in startReading.

    @Nullable

	private AvroMetadata metadata;



    // The current block.

    // Initialized in readNextRecord.

    @Nullable

	private AvroBlock<T> currentBlock;



    // A lock used to synchronize block offsets for getRemainingParallelism

    private final Object progressLock = new Object();



    // Offset of the current block.

    @GuardedBy("progressLock")

    private long currentBlockOffset = 0;



    // Size of the current block.

    @GuardedBy("progressLock")

    private long currentBlockSizeBytes = 0;



    // Stream used to read from the underlying file.

    // A pushback stream is used to restore bytes buffered during seeking.

    // Initialized in startReading.

    @Nullable

	private PushbackInputStream stream;



    // Counts the number of bytes read. Used only to tell how many bytes are taken up in

    // a block's variable-length header.

    // Initialized in startReading.

    @Nullable

	private CountingInputStream countStream;



    // Caches the Avro DirectBinaryDecoder used to decode binary-encoded values from the buffer.

    // Initialized in readNextBlock.

    @Nullable

	private BinaryDecoder decoder;



    /** Reads Avro records of type {@code T} from the specified source. */

    public AvroReader(AvroSource<T> source) {

      super(source);

    }



    @Override

    public synchronized AvroSource<T> getCurrentSource() {

      return (AvroSource<T>) super.getCurrentSource();

    }



    // Precondition: the stream is positioned after the sync marker in the current (about to be

    // previous) block. currentBlockSize equals the size of the current block, or zero if this

    // reader was just started.

    //

    // Postcondition: same as above, but for the new current (formerly next) block.

    @Override

    public boolean readNextBlock() throws IOException {

      long startOfNextBlock;

      synchronized (progressLock) {

        startOfNextBlock = currentBlockOffset + currentBlockSizeBytes;

      }



      // Before reading the variable-sized block header, record the current number of bytes read.

      long preHeaderCount = countStream.getBytesRead();

      decoder = DecoderFactory.get().directBinaryDecoder(countStream, decoder);

      long numRecords;

      try {

        numRecords = decoder.readLong();

      } catch (EOFException e) {

        // Expected for the last block, at which the start position is the EOF. The way to detect

        // stream ending is to try reading from it.

        return false;

      }

      long blockSize = decoder.readLong();



      // Mark header size as the change in the number of bytes read.

      long headerSize = countStream.getBytesRead() - preHeaderCount;



      // Create the current block by reading blockSize bytes. Block sizes permitted by the Avro

      // specification are [32, 2^30], so the cast is safe.

      byte[] data = new byte[(int) blockSize];

      int bytesRead = IOUtils.readFully(stream, data);

      checkState(

          blockSize == bytesRead,

          "Only able to read %s/%s bytes in the block before EOF reached.",

          bytesRead,

          blockSize);

      currentBlock =

          new AvroBlock<>(

              data,

              numRecords,

              getCurrentSource().mode,

              metadata.getSchemaString(),

              metadata.getCodec());



      // Read the end of this block, which MUST be a sync marker for correctness.

      byte[] syncMarker = metadata.getSyncMarker();

      byte[] readSyncMarker = new byte[syncMarker.length];

      long syncMarkerOffset = startOfNextBlock + headerSize + blockSize;

      bytesRead = IOUtils.readFully(stream, readSyncMarker);

      checkState(

          bytesRead == syncMarker.length,

          "Only able to read %s/%s bytes of Avro sync marker at position %s before EOF reached.",

          bytesRead,

          syncMarker.length,

          syncMarkerOffset);

      if (!Arrays.equals(syncMarker, readSyncMarker)) {

        throw new IllegalStateException(

            String.format(

                "Expected the bytes [%d,%d) in file %s to be a sync marker, but found %s",

                syncMarkerOffset,

                syncMarkerOffset + syncMarker.length,

                getCurrentSource().getFileOrPatternSpec(),

                Arrays.toString(readSyncMarker)));

      }



      // Atomically update both the position and offset of the new block.

      synchronized (progressLock) {

        currentBlockOffset = startOfNextBlock;

        // Total block size includes the header, block content, and trailing sync marker.

        currentBlockSizeBytes = headerSize + blockSize + syncMarker.length;

      }



      return true;

    }



    @Override

    public AvroBlock<T> getCurrentBlock() {

      return currentBlock;

    }



    @Override

    public long getCurrentBlockOffset() {

      synchronized (progressLock) {

        return currentBlockOffset;

      }

    }



    @Override

    public long getCurrentBlockSize() {

      synchronized (progressLock) {

        return currentBlockSizeBytes;

      }

    }



    @Override

    public long getSplitPointsRemaining() {

      if (isDone()) {

        return 0;

      }

      synchronized (progressLock) {

        if (currentBlockOffset + currentBlockSizeBytes >= getCurrentSource().getEndOffset()) {

          // This block is known to be the last block in the range.

          return 1;

        }

      }

      return super.getSplitPointsRemaining();

    }



    /**

     * Creates a {@link PushbackInputStream} that has a large enough pushback buffer to be able to

     * push back the syncBuffer.

     */

    private PushbackInputStream createStream(ReadableByteChannel channel) {

      return new PushbackInputStream(

          Channels.newInputStream(channel), metadata.getSyncMarker().length);

    }



    // Postcondition: the stream is positioned at the beginning of the first block after the start

    // of the current source, and currentBlockOffset is that position. Additionally,

    // currentBlockSizeBytes will be set to 0 indicating that the previous block was empty.

    @Override

    protected void startReading(ReadableByteChannel channel) throws IOException {

      try {

        metadata = readMetadataFromFile(getCurrentSource().getSingleFileMetadata().resourceId());

      } catch (IOException e) {

        throw new RuntimeException(

            "Error reading metadata from file " + getCurrentSource().getSingleFileMetadata(), e);

      }



      long startOffset = getCurrentSource().getStartOffset();

      byte[] syncMarker = metadata.getSyncMarker();

      long syncMarkerLength = syncMarker.length;



      if (startOffset != 0) {

        // Rewind order to find the sync marker ending the previous block.

        long position = Math.max(0, startOffset - syncMarkerLength);

        ((SeekableByteChannel) channel).position(position);

        startOffset = position;

      }



      // Satisfy the post condition.

      stream = createStream(channel);

      countStream = new CountingInputStream(stream);

      synchronized (progressLock) {

        currentBlockOffset = startOffset + advancePastNextSyncMarker(stream, syncMarker);

        currentBlockSizeBytes = 0;

      }

    }



    /**

     * Advances to the first byte after the next occurrence of the sync marker in the stream when

     * reading from the current offset. Returns the number of bytes consumed from the stream. Note

     * that this method requires a PushbackInputStream with a buffer at least as big as the marker

     * it is seeking for.

     */

    static long advancePastNextSyncMarker(PushbackInputStream stream, byte[] syncMarker)

        throws IOException {

      Seeker seeker = new Seeker(syncMarker);

      byte[] syncBuffer = new byte[syncMarker.length];

      long totalBytesConsumed = 0;

      // Seek until either a sync marker is found or we reach the end of the file.

      int mark = -1; // Position of the last byte in the sync marker.

      int read; // Number of bytes read.

      do {

        read = stream.read(syncBuffer);

        if (read >= 0) {

          mark = seeker.find(syncBuffer, read);

          // Update the currentOffset with the number of bytes read.

          totalBytesConsumed += read;

        }

      } while (mark < 0 && read > 0);



      // If the sync marker was found, unread block data and update the current offsets.

      if (mark >= 0) {

        // The current offset after this call should be just past the sync marker, so we should

        // unread the remaining buffer contents and update the currentOffset accordingly.

        stream.unread(syncBuffer, mark + 1, read - (mark + 1));

        totalBytesConsumed = totalBytesConsumed - (read - (mark + 1));

      }

      return totalBytesConsumed;

    }



    /**

     * A {@link Seeker} looks for a given marker within a byte buffer. Uses naive string matching

     * with a sliding window, as sync markers are small and random.

     */

    static class Seeker {

      // The marker to search for.

      private byte[] marker;



      // Buffer used for the sliding window.

      private byte[] searchBuffer;



      // Number of bytes available to be matched in the buffer.

      private int available = 0;



      /** Create a {@link Seeker} that looks for the given marker. */

      public Seeker(byte[] marker) {

        this.marker = marker;

        this.searchBuffer = new byte[marker.length];

      }



      /**

       * Find the marker in the byte buffer. Returns the index of the end of the marker in the

       * buffer. If the marker is not found, returns -1.

       *

       * <p>State is maintained between calls. If the marker was partially matched, a subsequent

       * call to find will resume matching the marker.

       *

       * @param buffer

       * @return the index of the end of the marker within the buffer, or -1 if the buffer was not

       *     found.

       */

      public int find(byte[] buffer, int length) {

        for (int i = 0; i < length; i++) {

          System.arraycopy(searchBuffer, 1, searchBuffer, 0, searchBuffer.length - 1);

          searchBuffer[searchBuffer.length - 1] = buffer[i];

          available = Math.min(available + 1, searchBuffer.length);

          if (ByteBuffer.wrap(searchBuffer, searchBuffer.length - available, available)

              .equals(ByteBuffer.wrap(marker))) {

            available = 0;

            return i;

          }

        }

        return -1;

      }

    }

  }

}